/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.iotdb.db.mpp.plan.planner;

import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
import org.apache.iotdb.db.mpp.execution.timer.RuleBasedTimeSliceAllocator;
import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;

import java.util.*;
import java.util.concurrent.ExecutorService;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class LocalExecutionPlanContext {
    private final FragmentInstanceContext instanceContext;
    private final List<PartialPath> paths;
    // deviceId -> sensorId Set
    private final Map<String, Set<String>> allSensorsMap;
    // Used to lock corresponding query resources
    private final List<DataSourceOperator> sourceOperators;

    private final long dataRegionTTL;
    private ISinkHandle sinkHandle;

    private int nextOperatorId = 0;

    private final TypeProvider typeProvider;

    private List<TSDataType> cachedDataTypes;

    // left is cached last value in last query
    // right is full path for each cached last value
    private List<Pair<TimeValuePair, Binary>> cachedLastValueAndPathList;
    // timeFilter for last query
    private Filter lastQueryTimeFilter;
    // whether we need to update last cache
    private boolean needUpdateLastCache;

    private final RuleBasedTimeSliceAllocator timeSliceAllocator;

    // for data region
    public LocalExecutionPlanContext(
            TypeProvider typeProvider, FragmentInstanceContext instanceContext, long dataRegionTTL) {
        this.typeProvider = typeProvider;
        this.instanceContext = instanceContext;
        this.paths = new ArrayList<>();
        this.allSensorsMap = new HashMap<>();
        this.sourceOperators = new ArrayList<>();
        this.timeSliceAllocator = new RuleBasedTimeSliceAllocator();
        this.dataRegionTTL = dataRegionTTL;
    }

    // for schema region
    public LocalExecutionPlanContext(FragmentInstanceContext instanceContext) {
        this.instanceContext = instanceContext;
        this.paths = new ArrayList<>();
        this.allSensorsMap = new HashMap<>();
        this.sourceOperators = new ArrayList<>();
        this.typeProvider = null;

        // only used in `order by heat`
        this.timeSliceAllocator = new RuleBasedTimeSliceAllocator();
        // there is no ttl in schema region, so we don't care this field
        this.dataRegionTTL = Long.MAX_VALUE;
    }

    public int getNextOperatorId() {
        return nextOperatorId++;
    }

    public List<PartialPath> getPaths() {
        return paths;
    }

    public Set<String> getAllSensors(String deviceId, String sensorId) {
        Set<String> allSensors = allSensorsMap.computeIfAbsent(deviceId, k -> new HashSet<>());
        allSensors.add(sensorId);
        return allSensors;
    }

    public List<DataSourceOperator> getSourceOperators() {
        return sourceOperators;
    }

    public void addPath(PartialPath path) {
        paths.add(path);
    }

    public void addSourceOperator(DataSourceOperator sourceOperator) {
        sourceOperators.add(sourceOperator);
    }

    public void setLastQueryTimeFilter(Filter lastQueryTimeFilter) {
        this.lastQueryTimeFilter = lastQueryTimeFilter;
    }

    public void setNeedUpdateLastCache(boolean needUpdateLastCache) {
        this.needUpdateLastCache = needUpdateLastCache;
    }

    public void addCachedLastValue(TimeValuePair timeValuePair, String fullPath) {
        if (cachedLastValueAndPathList == null) {
            cachedLastValueAndPathList = new ArrayList<>();
        }
        cachedLastValueAndPathList.add(new Pair<>(timeValuePair, new Binary(fullPath)));
    }

    public List<Pair<TimeValuePair, Binary>> getCachedLastValueAndPathList() {
        return cachedLastValueAndPathList;
    }

    public ISinkHandle getSinkHandle() {
        return sinkHandle;
    }

    public void setSinkHandle(ISinkHandle sinkHandle) {
        requireNonNull(sinkHandle, "sinkHandle is null");
        checkArgument(this.sinkHandle == null, "There must be at most one SinkNode");

        this.sinkHandle = sinkHandle;
    }

    public void setCachedDataTypes(List<TSDataType> cachedDataTypes) {
        this.cachedDataTypes = cachedDataTypes;
    }

    public List<TSDataType> getCachedDataTypes() {
        return cachedDataTypes;
    }

    public TypeProvider getTypeProvider() {
        return typeProvider;
    }

    public RuleBasedTimeSliceAllocator getTimeSliceAllocator() {
        return timeSliceAllocator;
    }

    public FragmentInstanceContext getInstanceContext() {
        return instanceContext;
    }

    public Filter getLastQueryTimeFilter() {
        return lastQueryTimeFilter;
    }

    public boolean isNeedUpdateLastCache() {
        return needUpdateLastCache;
    }

    public long getDataRegionTTL() {
        return dataRegionTTL;
    }

    public ExecutorService getIntoOperationExecutor() {
        return instanceContext.getIntoOperationExecutor();
    }
}
